/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.end2end;

import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.phoenix.iterate.ScanningResultPostDummyResultCaller;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;

/**
 * Uncovered index tests that include some region moves while performing rs#next.
 */
@Category(NeedsOwnMiniClusterTest.class)
public class UncoveredIndexWithRegionMovesIT extends ParallelStatsDisabledIT {

  private static final Logger LOGGER =
    LoggerFactory.getLogger(UncoveredIndexWithRegionMovesIT.class);

  private static boolean hasTestStarted = false;
  private static int countOfDummyResults = 0;
  private static final List<String> TABLE_NAMES = Collections.synchronizedList(new ArrayList<>());

  private static class TestScanningResultPostDummyResultCaller
    extends ScanningResultPostDummyResultCaller {

    @Override
    public void postDummyProcess() {
      if (
        hasTestStarted && (countOfDummyResults++ % 3) == 0
          && (countOfDummyResults < 17 || countOfDummyResults > 28 && countOfDummyResults < 40)
      ) {
        LOGGER.info("Moving regions of tables {}. current count of dummy results: {}", TABLE_NAMES,
          countOfDummyResults);
        TABLE_NAMES.forEach(table -> {
          try {
            moveRegionsOfTable(table);
          } catch (IOException e) {
            throw new RuntimeException(e);
          }
        });
      }
    }
  }

  @BeforeClass
  public static synchronized void doSetup() throws Exception {
    Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
    props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
    props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS, String.valueOf(2));
    props.put(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, String.valueOf(1));
    props.put(QueryServices.PHOENIX_POST_DUMMY_PROCESS,
      TestScanningResultPostDummyResultCaller.class.getName());
    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
  }

  @After
  public void tearDown() throws Exception {
    TABLE_NAMES.clear();
    hasTestStarted = false;
    countOfDummyResults = 0;
  }

  private void assertServerPagingMetric(String tableName, ResultSet rs, boolean isPaged)
    throws SQLException {
    Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
    for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) {
      assertEquals(String.format("Got %s", entry.getKey()), tableName, entry.getKey());
      Map<MetricType, Long> metricValues = entry.getValue();
      Long pagedRowsCntr = metricValues.get(MetricType.PAGED_ROWS_COUNTER);
      assertNotNull(pagedRowsCntr);
      if (isPaged) {
        assertTrue(String.format("Got %d", pagedRowsCntr), pagedRowsCntr > 0);
      } else {
        assertEquals(String.format("Got %d", pagedRowsCntr), 0, (long) pagedRowsCntr);
      }
    }
    assertTrue(GLOBAL_PAGED_ROWS_COUNTER.getMetric().getValue() > 0);
  }

  private static void moveRegionsOfTable(String tableName) throws IOException {
    try (AsyncConnection asyncConnection =
      ConnectionFactory.createAsyncConnection(getUtility().getConfiguration()).get()) {
      AsyncAdmin admin = asyncConnection.getAdmin();
      List<ServerName> servers = new ArrayList<>(admin.getRegionServers().get());
      ServerName server1 = servers.get(0);
      ServerName server2 = servers.get(1);
      List<RegionInfo> regionsOnServer1;
      regionsOnServer1 = admin.getRegions(server1).get();
      List<RegionInfo> regionsOnServer2;
      regionsOnServer2 = admin.getRegions(server2).get();
      regionsOnServer1.forEach(regionInfo -> {
        if (regionInfo.getTable().equals(TableName.valueOf(tableName))) {
          try {
            for (int i = 0; i < 2; i++) {
              RegionStatesCount regionStatesCount = admin.getClusterMetrics().get()
                .getTableRegionStatesCount().get(TableName.valueOf(tableName));
              if (
                regionStatesCount.getRegionsInTransition() == 0
                  && regionStatesCount.getOpenRegions() == regionStatesCount.getTotalRegions()
              ) {
                LOGGER.info("Moving region {} to {}", regionInfo.getRegionNameAsString(), server2);
                admin.move(regionInfo.getEncodedNameAsBytes(), server2).get(3, TimeUnit.SECONDS);
                break;
              } else {
                LOGGER.info("Table {} has some region(s) in RIT or not online", tableName);
              }
            }
          } catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOGGER.error("Something went wrong", e);
            throw new RuntimeException(e);
          }
        }
      });
      regionsOnServer2.forEach(regionInfo -> {
        if (regionInfo.getTable().equals(TableName.valueOf(tableName))) {
          try {
            for (int i = 0; i < 2; i++) {
              RegionStatesCount regionStatesCount = admin.getClusterMetrics().get()
                .getTableRegionStatesCount().get(TableName.valueOf(tableName));
              if (
                regionStatesCount.getRegionsInTransition() == 0
                  && regionStatesCount.getOpenRegions() == regionStatesCount.getTotalRegions()
              ) {
                admin.move(regionInfo.getEncodedNameAsBytes(), server1).get(3, TimeUnit.SECONDS);
                LOGGER.info("Moving region {} to {}", regionInfo.getRegionNameAsString(), server1);
                break;
              } else {
                LOGGER.info("Table {} has some region(s) in RIT or not online", tableName);
              }
            }
          } catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOGGER.error("Something went wrong", e);
            throw new RuntimeException(e);
          }
        }
      });
    } catch (Exception e) {
      LOGGER.error("Something went wrong..", e);
    }
  }

  @Test
  public void testUncoveredQueryWithGroupBy() throws Exception {
    hasTestStarted = true;
    String dataTableName = generateUniqueName();
    populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd',
                                  // 'bcde')
    try (Connection conn = DriverManager.getConnection(getUrl())) {
      String indexTableName = generateUniqueName();
      conn.createStatement()
        .execute("CREATE UNCOVERED INDEX " + indexTableName + " on " + dataTableName + " (val1) ");
      TABLE_NAMES.add(dataTableName);
      TABLE_NAMES.add(indexTableName);
      String selectSql;
      int limit = 10;
      // Verify that an index hint is not necessary for an uncovered index
      selectSql = "SELECT  val2, val3 from " + dataTableName
        + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde') LIMIT " + limit;
      assertExplainPlanWithLimit(conn, selectSql, dataTableName, indexTableName, limit);

      ResultSet rs = conn.createStatement().executeQuery(selectSql);
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals("bcd", rs.getString(1));
      assertEquals("bcde", rs.getString(2));
      assertFalse(rs.next());
      assertServerPagingMetric(indexTableName, rs, true);

      // Add another row and run a group by query where the uncovered index should be used
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
      conn.commit();

      selectSql = "SELECT count(val3) from " + dataTableName + " where val1 > '0' GROUP BY val1";
      // Verify that we will read from the index table
      assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
      rs = conn.createStatement().executeQuery(selectSql);

      TestUtil.dumpTable(conn, TableName.valueOf(dataTableName));
      TestUtil.dumpTable(conn, TableName.valueOf(indexTableName));

      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals(2, rs.getInt(1));
      assertTrue(rs.next());
      assertEquals(1, rs.getInt(1));
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertFalse(rs.next());

      selectSql = "SELECT count(val3) from " + dataTableName + " where val1 > '0'";
      // Verify that we will read from the index table
      assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
      rs = conn.createStatement().executeQuery(selectSql);
      assertTrue(rs.next());
      assertEquals(3, rs.getInt(1));

      // Run an order by query where the uncovered index should be used
      selectSql = "SELECT val3 from " + dataTableName + " where val1 > '0' ORDER BY val1";
      // Verify that we will read from the index table
      assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
      rs = conn.createStatement().executeQuery(selectSql);
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals("abcd", rs.getString(1));
      assertTrue(rs.next());
      assertEquals("cdef", rs.getString(1));
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals("bcde", rs.getString(1));
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertFalse(rs.next());
    }
  }

  @Test
  public void testUncoveredQuery() throws Exception {
    testUncoveredUtil(false);
  }

  @Test
  public void testUncoveredQueryWithLimit() throws Exception {
    testUncoveredUtil(true);
  }

  private void testUncoveredUtil(boolean limit) throws Exception {
    hasTestStarted = true;
    String dataTableName = generateUniqueName();
    populateTable(dataTableName);
    try (Connection conn = DriverManager.getConnection(getUrl())) {
      String indexTableName = generateUniqueName();
      conn.createStatement()
        .execute("CREATE UNCOVERED INDEX " + indexTableName + " on " + dataTableName + " (val1) ");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('c', 'cd','cde', 'cdef')");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('d', 'de','de1', 'de11')");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('e', 'ef','ef1', 'ef11')");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('f', 'fg','fg1', 'fg11')");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('g', 'gh','gh1', 'gh11')");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('h', 'hi','hi1', 'hi11')");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('i', 'ij','ij1', 'ij11')");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('j', 'jk','jk1', 'jk11')");
      conn.createStatement().execute("upsert into " + dataTableName
        + " (id, val1, val2, val3) values ('k', 'kl','kl1', 'kl11')");
      conn.commit();
      TABLE_NAMES.add(dataTableName);
      TABLE_NAMES.add(indexTableName);
      String selectSql;

      // Verify that an index hint is not necessary for an uncovered index
      selectSql = "SELECT  val2, val3 from " + dataTableName + " WHERE val1 IS NOT NULL"
        + (limit ? " LIMIT 15" : "");

      ResultSet rs = conn.createStatement().executeQuery(selectSql);
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals("abc", rs.getString(1));
      assertEquals("abcd", rs.getString(2));
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals("bcd", rs.getString(1));
      assertEquals("bcde", rs.getString(2));
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals("cde", rs.getString(1));
      assertEquals("cdef", rs.getString(2));
      assertTrue(rs.next());
      assertEquals("de1", rs.getString(1));
      assertEquals("de11", rs.getString(2));
      assertTrue(rs.next());
      assertEquals("ef1", rs.getString(1));
      assertEquals("ef11", rs.getString(2));
      assertTrue(rs.next());
      assertEquals("fg1", rs.getString(1));
      assertEquals("fg11", rs.getString(2));
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals("gh1", rs.getString(1));
      assertEquals("gh11", rs.getString(2));
      assertTrue(rs.next());
      assertEquals("hi1", rs.getString(1));
      assertEquals("hi11", rs.getString(2));
      moveRegionsOfTable(dataTableName);
      moveRegionsOfTable(indexTableName);
      assertTrue(rs.next());
      assertEquals("ij1", rs.getString(1));
      assertEquals("ij11", rs.getString(2));
      assertTrue(rs.next());
      assertEquals("jk1", rs.getString(1));
      assertEquals("jk11", rs.getString(2));
      assertTrue(rs.next());
      assertEquals("kl1", rs.getString(1));
      assertEquals("kl11", rs.getString(2));
      assertFalse(rs.next());
      assertServerPagingMetric(indexTableName, rs, true);
    }
  }

  private void populateTable(String tableName) throws Exception {
    Connection conn = DriverManager.getConnection(getUrl());
    conn.createStatement()
      .execute("create table " + tableName
        + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10),"
        + " val3 varchar(10))");
    conn.createStatement()
      .execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
    conn.commit();
    conn.createStatement()
      .execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
    conn.commit();
    conn.close();
  }
}
